package com.example.hotitemanalysis;

import com.example.bean.UserBehavior;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class HotItemsWithPureSql {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> inputStream = env.readTextFile("E:\\Gitee\\UserBehaviorAnalysis\\HotItemAnalysis\\src\\main\\resources\\UserBehavior.csv");
        //DataStream<String> inputStream = env.socketTextStream("localhost", 9090);

        DataStream<UserBehavior> behaviorDataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            UserBehavior userBehavior = new UserBehavior();
            userBehavior.setUserId(new Long(fields[0]));
            userBehavior.setItemId(new Long(fields[1]));
            userBehavior.setCategoryId(new Integer(fields[2]));
            userBehavior.setBehavior(fields[3]);
            userBehavior.setTimestamp(new Long(fields[4]));
            return userBehavior;
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior userBehavior) {
                return userBehavior.getTimestamp() * 1000L;
            }
        });
        //behaviorDataStream.print();
        //创建表执行环境 用blink版本
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        //纯SQL
        tableEnv.createTemporaryView("data_table", behaviorDataStream, "itemId, behavior, timestamp.rowtime as ts");
        Table  resultSqlTable = tableEnv.sqlQuery("select * from (select * ,ROW_NUMBER() over (PARTITION BY windowEnd order by cnt DESC) as row_num from (select itemId,count(itemId) as cnt, HOP_END(ts,interval '5' minute, interval '1' hour) as windowEnd from data_table where behavior = 'pv' group by itemId, HOP(ts,interval '5' minute, interval '1' hour))) where row_num <=5");

        tableEnv.toRetractStream(resultSqlTable, Row.class).print();

        env.execute("job-hot items with pure sql");
    }
}
